package com.mzj.saas.commons.util;

import java.io.IOException;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.WatchEvent;
import java.nio.file.WatchEvent.Kind;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.nio.file.StandardWatchEventKinds.* ;

public class FileSystemWatcher
{
	private static final Logger LOG = LoggerFactory.getLogger( FileSystemWatcher.class ) ;
	
	public static FileSystemWatcher DEFAULT = new FileSystemWatcher() ;
	
	FileSystem fs ;
	WatchService watcher;
	ConcurrentLinkedQueue<WatchTask> tasks = new ConcurrentLinkedQueue<FileSystemWatcher.WatchTask>() ;
	
	AtomicInteger counter = new AtomicInteger() ;
	Thread thread = null ;
	
	WatchTask[] injectTasks ;
	
	public void init()
	{
		if( injectTasks == null )
			return ;
		for( WatchTask task:injectTasks )
			watch( task ) ;
	}
	
	public void destroy()
	{
		if( thread != null )
		{
			thread.interrupt();
			thread = null ;
		}
	}

	public void setInjectTasks( WatchTask[] tasks )
	{
		this.injectTasks = tasks ;
	}
	
	public WatchTask[] getInjectTasks()
	{
		return injectTasks ;
	}

	public FileSystemWatcher()
	{
		this( FileSystems.getDefault() ) ;
	}
	
	public FileSystemWatcher( FileSystem fs )
	{
		this.fs = fs ;
		try
		{
			watcher = fs.newWatchService() ;
		}
		catch (IOException e)
		{
			throw new RuntimeException(e) ;
		}
	}
	
	public WatchTask watch( String dir, Callback callback )
	{
		return watch( fs.getPath( dir), null, null, callback ) ;
	}

	public WatchTask watch( String dir, String suffix, Callback callback )
	{
		return watch( dir, suffix, null, callback ) ;
	}

	public WatchTask watch( String dir, Pattern pattern, Callback callback )
	{
		return watch( dir, null, pattern, callback ) ;
	}
	
	public WatchTask watch( String dir, String suffix, Pattern pattern, Callback callback )
	{
		return watch( fs.getPath( dir), suffix, pattern, callback, OVERFLOW, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY ) ;
	}
	
	public WatchTask watch( Path dir, Callback callback )
	{
		return watch( dir, null, null, callback, OVERFLOW, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY ) ;
	}

	public WatchTask watch( Path dir, String suffix, Callback callback )
	{
		return watch( dir, suffix, null, callback ) ;
	}

	public WatchTask watch( Path dir, Pattern pattern, Callback callback )
	{
		return watch( dir, null, pattern, callback ) ;
	}
	
	public WatchTask watch( Path dir, String suffix, Pattern pattern, Callback callback )
	{
		return watch( dir, suffix, pattern, callback, OVERFLOW, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY ) ;
	}

	@SuppressWarnings("unchecked")
	private WatchTask watch( Path dir, String suffix, Pattern pattern, Callback callback, Kind<?> ...events )
	{
		WatchTask task = new WatchTask() ;
		task.setDir(dir);
		task.setSuffix(suffix);
		task.setPattern(pattern);
		task.setCallback(callback);
		
		task.setEvents((Kind<Path>[])events);
		
		return watch( task ) ;
	}
	
	public WatchTask watch( WatchTask task )
	{
		if( !tasks.offer( task ) )
			return null ;
		
		try
		{
			if( counter.incrementAndGet() == 1 )
			{
				//start processor thread
				thread = new Thread( new Runnable(){
					@SuppressWarnings("unchecked")
					@Override
					public void run()
					{
						Map<String, HashSet<WatchTask>> watchingPaths = new HashMap<String,HashSet<WatchTask>>() ;
						Map<WatchKey, WatchTask> watchingKeys = new HashMap<WatchKey, WatchTask>() ; 
						
						for (;;)
						{
							if( Thread.interrupted() )
								return ;

							try
							{
								WatchKey key = watcher.poll( 100, TimeUnit.MILLISECONDS ) ;
								if( key != null )
								{
									WatchTask orientTask = watchingKeys.get( key ) ;
									String absoluteDir = orientTask.absoluteDir ;
									
									for (WatchEvent<?> event : key.pollEvents())
									{
										WatchEvent.Kind<?> kind = event.kind();
										if (kind == OVERFLOW)
											continue;

										String entryName = (((WatchEvent<Path>) event).context()).getFileName().toString();

										HashSet<WatchTask> pathTasks = watchingPaths.get( absoluteDir ) ;
										if( pathTasks == null || pathTasks.isEmpty() )
											continue ;
										
										for( WatchTask task:pathTasks )
										{
											if( !doFilter( task, entryName ) )
												continue ;

											Path absolutePath = task.dir.resolve(entryName);
											int ctype = Callback.OTHER ;
											if( kind == ENTRY_CREATE )
												ctype = Callback.CREATE ;
											else if( kind == ENTRY_MODIFY )
												ctype = Callback.MODIFY ;
											else if( kind == ENTRY_DELETE )
												ctype = Callback.DELETE ;

											if( task.reduceEvents )
											{
												if( ctype == Callback.DELETE )
												{
													task.logs.remove( entryName ) ;
													doCallback( task, absolutePath, ctype ) ;
												}
												else
												{
													Log log = task.logs.get( entryName ) ;
													if( log == null )
													{
														log = new Log( ctype, System.currentTimeMillis() ) ;
														task.logs.put( entryName, log ) ;
													}
													else
													{
														log.type = ctype ;
														log.timestamp = System.currentTimeMillis() ;
													}
												}
											}
											else
												doCallback( task, absolutePath, ctype ) ;
										}
										
									}

									if( !key.reset() )
										watchingKeys.remove(key);
								}
								
								Iterator<WatchTask> iter = tasks.iterator() ;
								while( iter.hasNext() )
								{
									WatchTask task = iter.next() ;
									switch( task.status )
									{
									case WatchTask.PENDING:
										try
										{
											WatchKey newkey = task.dir.register(watcher, task.events ) ;
											task.key = newkey ;
											task.status = WatchTask.WATCHING ;
											watchingKeys.put( newkey, task ) ;
											
											HashSet<WatchTask> pathTasks = watchingPaths.get( task.absoluteDir ) ;
											if( pathTasks == null )
											{
												pathTasks = new HashSet<FileSystemWatcher.WatchTask>() ;
												watchingPaths.put( task.absoluteDir, pathTasks ) ;
											}
											
											pathTasks.add( task ) ;
										}
										catch( NoSuchFileException e)
										{}
										catch (IOException e)
										{
											LOG.error( "Error occured, {}", e.getMessage() );
											if( LOG.isDebugEnabled() )
												LOG.error(null, e );
										}
										break ;
									case WatchTask.WATCHING:
										if( task.reduceEvents && !task.logs.isEmpty() )
										{
											long ctime = System.currentTimeMillis() ;
											Iterator<Map.Entry<String, Log>> iter2 = task.logs.entrySet().iterator() ;
											while( iter2.hasNext() )
											{
												Map.Entry<String, Log> entry = iter2.next() ;
												Log log = entry.getValue() ;
												if( ctime-log.timestamp > task.interval )
												{
													iter2.remove();
													Path absolutePath = task.dir.resolve(entry.getKey());
													
													doCallback(task, absolutePath, log.type);
												}
											}
										}
										break ;
									case WatchTask.CANCELED:
										if( task.key != null )
										{
											task.key.cancel();
											watchingKeys.remove( task.key ) ;
											
											HashSet<WatchTask> pathTasks = watchingPaths.get( task.absoluteDir ) ;
											if( pathTasks != null )
											{
												pathTasks.remove( task ) ;
												if( pathTasks.isEmpty() ) ;
												watchingPaths.remove( task.absoluteDir ) ;
											}
										}
										
										iter.remove();
										break ;
									}
								}

							}
							catch( InterruptedException err )
							{
								return ;
							}
						}
					}
				}, "FileSystemWatcher" ) ;
				
				thread.start();
			}
			
			return task ;
		}
		catch( Throwable err )
		{
			LOG.error( null, err );
			return null ;
		}
	}
	
	private boolean doFilter( WatchTask task, String entryName )
	{
		//check if temp file
		if (task.tmp != null && task.tmp.length() > 0 && entryName.endsWith(task.tmp))
			return false;

		if (task.suffix != null || task.pattern != null)
		{
			boolean accept = false;
			if (task.suffix != null)
				accept = entryName.endsWith(task.suffix);
			if (!accept && task.pattern != null)
				accept = task.pattern.matcher(entryName).matches();
			if (!accept)
				return false;
		}
		return true ;
	}
	
	private void doCallback( WatchTask task, Path path, int event )
	{
		if( task.callback == null )
			return ;
		try
		{
			task.callback.process(path, event);
		}
		catch( Throwable err )
		{
			LOG.error("Error occured, {}", err.getMessage());
			if (LOG.isDebugEnabled())
				LOG.error(null, err);
		}
	}
	
	private class Log
	{
		int type;
		long timestamp ;
		
		public Log( int type, long timestamp )
		{
			this.type = type ;
			this.timestamp = timestamp ;
		}
	}
	
	public class WatchTask
	{
		private static final int PENDING = 0 ;
		private static final int WATCHING = 1 ;
		private static final int CANCELED = 2 ;
		
		private String absoluteDir ;
		private Path dir ;
		private Pattern pattern ;
		private String suffix ;
		private String tmp ;
		private Callback callback ;
		private boolean reduceEvents = true ;
		private long interval = 5000 ;

		private Kind<Path>[] events ;
		private Map<String, Log> logs = new HashMap<String, Log>() ;
		private WatchKey key ;
		private volatile int status = PENDING ;
		
		public WatchTask(){}
		
		public WatchTask( String dir, String suffix, Callback callback )
		{
			this.dir = fs.getPath(dir) ;
			this.suffix = suffix ;
			this.callback = callback ;
			this.absoluteDir = this.dir.toAbsolutePath().toString() ;
		}
		
		public Path getDir()
		{
			return dir;
		}
		public void setDir(Path dir)
		{
			this.dir = dir;
			this.absoluteDir = this.dir.toAbsolutePath().toString() ;
		}
		public Pattern getPattern()
		{
			return pattern;
		}
		public void setPattern(Pattern pattern)
		{
			this.pattern = pattern;
		}
		public String getSuffix()
		{
			return suffix;
		}
		public void setSuffix(String suffix)
		{
			if( suffix == null || suffix.isEmpty() )
				return ;
			this.suffix = StringUtils.trimLeading( suffix, '*' );
		}
		public Callback getCallback()
		{
			return callback;
		}
		public void setCallback(Callback callback)
		{
			this.callback = callback;
		}
		public Kind<Path>[] getEvents()
		{
			return events;
		}
		public void setEvents(Kind<Path>[] events)
		{
			this.events = events;
		}
		public String getTmp()
		{
			return tmp;
		}
		public void setTmp(String tmp)
		{
			this.tmp = tmp;
		}
		public boolean isReduceEvents()
		{
			return reduceEvents;
		}
		public void setReduceEvents(boolean reduceEvents)
		{
			this.reduceEvents = reduceEvents;
		}
		public void cancel()
		{
			this.status = CANCELED ;
		}
		public long getInterval()
		{
			return interval;
		}
		public void setInterval(long interval)
		{
			this.interval = interval;
		}
	}

	public interface Callback
	{
		public static final int OTHER = 0 ;
		public static final int CREATE = 1 ;
		public static final int MODIFY = 2 ;
		public static final int DELETE = 4 ;
		
		public void process( Path path, int event ) ;
	}
}
